package com.atguigu.app.dwd;

import com.atguigu.common.Constant;
import com.atguigu.utils.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

//数据流:web/app -> Nginx -> 业务服务器(Mysql) -> Maxwell -> Kafka(ODS) -> FlinkApp -> Kafka(DWD)
//程  序:Mock -> Mysql -> Maxwell -> Kafka(ZK) -> Dwd03_TradeCartAdd -> Kafka(ZK)
public class Dwd03_TradeCartAdd {

    public static void main(String[] args) {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1.1 开启CK
//        env.enableCheckpointing(10000L);
//        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//        checkpointConfig.setCheckpointTimeout(20000L);
//        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/flink-ck");
//        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        //checkpointConfig.setCheckpointInterval(10000L);
//        checkpointConfig.setMinPauseBetweenCheckpoints(5000L);
//        checkpointConfig.setMaxConcurrentCheckpoints(2);
//        //默认是int类型的最大值
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//        env.setStateBackend(new HashMapStateBackend());
//
//        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //2.使用FlinkSQL方式获取Kafka topic_db主题数据
        tableEnv.executeSql(KafkaUtil.getTopicDbDDL("dwd03_trade_cart_add_230315"));

        //3.过滤出加购数据
        Table resultTable = tableEnv.sqlQuery("" +
                "select\n" +
                "    `data`['id'] id,\n" +
                "    `data`['user_id'] user_id,\n" +
                "    `data`['sku_id'] sku_id,\n" +
                "    `data`['cart_price'] cart_price,\n" +
                "    if(`type`='insert',`data`['sku_num'],cast(cast(`data`['sku_num'] as bigint)-cast(`old`['sku_num'] as bigint) as string)) sku_num,\n" +
                "    `data`['img_url'] img_url,\n" +
                "    `data`['sku_name'] sku_name,\n" +
                "    `data`['is_checked'] is_checked,\n" +
                "    `data`['create_time'] create_time,\n" +
                "    `data`['operate_time'] operate_time,\n" +
                "    `data`['is_ordered'] is_ordered,\n" +
                "    `data`['order_time'] order_time,\n" +
                "    `data`['source_type'] source_type,\n" +
                "    `data`['source_id'] source_id\n" +
                "from topic_db\n" +
                "where `database`='gmall-230315-flink'\n" +
                "and `table`='cart_info'\n" +
                "and (\n" +
                "    `type`='insert' \n" +
                "    or \n" +
                "    (`type`='update' and `old`['sku_num'] is not null and cast(`old`['sku_num'] as bigint) < cast(`data`['sku_num'] as bigint))\n" +
                ")");
        tableEnv.createTemporaryView("result_table", resultTable);

        //4.构建KafkaSink表,将数据写出
        tableEnv.executeSql("" +
                "create table dwd_cart_info(\n" +
                "    `id` string,\n" +
                "    `user_id` string,\n" +
                "    `sku_id` string,\n" +
                "    `cart_price` string,\n" +
                "    `sku_num` string,\n" +
                "    `img_url` string,\n" +
                "    `sku_name` string,\n" +
                "    `is_checked` string,\n" +
                "    `create_time` string,\n" +
                "    `operate_time` string,\n" +
                "    `is_ordered` string,\n" +
                "    `order_time` string,\n" +
                "    `source_type` string,\n" +
                "    `source_id` string\n" +
                ")" + KafkaUtil.getKafkaSinkDDL(Constant.TOPIC_DWD_TRADE_CART_ADD));
        tableEnv.executeSql("insert into dwd_cart_info select * from result_table");

//        resultTable.insertInto("dwd_cart_info");
    }

}
